{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Working with IPython and dask.distributed\n",
    "\n",
    "[dask.distributed](https://distributed.readthedocs.io) is a cool library for doing distributed execution. You should check it out, if you haven't already.\n",
    "\n",
    "Assuming you already have an IPython cluster running:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[0, 1, 2, 3, 4, 5, 6, 7]"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import ipyparallel as ipp\n",
    "rc = ipp.Client()\n",
    "rc.ids"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can turn your IPython cluster into a distributed cluster by calling `Client.become_dask()`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<Executor: scheduler=\"172.16.3.46:52245\" processes=9 cores=9>"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "executor = rc.become_dask(ncores=1)\n",
    "executor"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This will:\n",
    "\n",
    "1. start a Scheduler on the Hub\n",
    "2. start a Worker on each engine\n",
    "3. return an Executor, the distributed client API\n",
    "\n",
    "By default, distributed Workers will use threads to run on all cores of a machine. \n",
    "In this case, since I already have one *engine* per core,\n",
    "I tell distributed to run one core per Worker with `ncores=1`.\n",
    "\n",
    "We can now use our IPython cluster with distributed:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from distributed import progress\n",
    "\n",
    "def square(x):\n",
    "    return x ** 2\n",
    "\n",
    "def neg(x):\n",
    "        return -x\n",
    "\n",
    "A = executor.map(square, range(1000))\n",
    "B = executor.map(neg, A)\n",
    "total = executor.submit(sum, B)\n",
    "progress(total)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "-332833500"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "total.result()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "I could also let distributed do its multithreading thing, and run one multi-threaded Worker per engine.\n",
    "\n",
    "First, I need to get a mapping of one engine per host:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{0: 'k5.simula.no',\n",
       " 1: 'k5.simula.no',\n",
       " 2: 'k5.simula.no',\n",
       " 3: 'k5.simula.no',\n",
       " 4: 'k5.simula.no',\n",
       " 5: 'k5.simula.no',\n",
       " 6: 'k5.simula.no',\n",
       " 7: 'k5.simula.no'}"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import socket\n",
    "\n",
    "engine_hosts = rc[:].apply_async(socket.gethostname).get_dict()\n",
    "engine_hosts"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "I can reverse this mapping, to get a list of engines on each host:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'k5.simula.no': [0, 1, 2, 3, 4, 5, 6, 7]}"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "host_engines = {}\n",
    "for engine_id, host in engine_hosts.items():\n",
    "    if host not in host_engines:\n",
    "        host_engines[host] = []\n",
    "    host_engines[host].append(engine_id)\n",
    "\n",
    "host_engines"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now I can get one engine per host:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[0]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "one_engine_per_host = [ engines[0] for engines in host_engines.values()]\n",
    "one_engine_per_host"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "*Here's a concise, but more opaque version that does the same thing:*"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[7]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "one_engine_per_host = list({host:eid for eid,host in engine_hosts.items()}.values())\n",
    "one_engine_per_host"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "I can now stop the first distributed cluster, and start a new one on just these engines, letting distributed allocate threads:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "distributed.executor - INFO - Reconnecting...\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<Executor: scheduler=\"172.16.3.46:59120\" processes=1 cores=1>"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rc.stop_distributed()\n",
    "\n",
    "executor = rc.become_dask(one_engine_per_host)\n",
    "executor"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And submit the same tasks again:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Widget Javascript not detected.  It may not be installed properly. Did you enable the widgetsnbextension? If not, then run \"jupyter nbextension enable --py --sys-prefix widgetsnbextension\"\n"
     ]
    }
   ],
   "source": [
    "A = executor.map(square, range(100))\n",
    "B = executor.map(neg, A)\n",
    "total = executor.submit(sum, B)\n",
    "progress(total)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Debugging distributed with IPython"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "distributed.executor - INFO - Reconnecting...\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<Executor: scheduler=\"172.16.3.46:59142\" processes=1 cores=1>"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rc.stop_distributed()\n",
    "\n",
    "executor = rc.become_dask(one_engine_per_host)\n",
    "executor"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's set the %px magics to only run on our one engine per host:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "view = rc[one_engine_per_host]\n",
    "view.block = True\n",
    "view.activate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's submit some work that's going to fail somewhere in the middle:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Widget Javascript not detected.  It may not be installed properly. Did you enable the widgetsnbextension? If not, then run \"jupyter nbextension enable --py --sys-prefix widgetsnbextension\"\n"
     ]
    },
    {
     "ename": "ZeroDivisionError",
     "evalue": "division by zero",
     "output_type": "error",
     "traceback": [
      "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[1;31mZeroDivisionError\u001b[0m                         Traceback (most recent call last)",
      "\u001b[1;32m<ipython-input-13-183a85878b6a>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[0;32m     13\u001b[0m \u001b[0mtotal\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mexecutor\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msubmit\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0msum\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0minverted\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m     14\u001b[0m \u001b[0mdisplay\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mprogress\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mtotal\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 15\u001b[1;33m \u001b[0mtotal\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mresult\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
      "\u001b[1;32m/Users/benjaminrk/dev/py/distributed/distributed/executor.py\u001b[0m in \u001b[0;36mresult\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    100\u001b[0m         \u001b[0mresult\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0msync\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mexecutor\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mloop\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_result\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mraiseit\u001b[0m\u001b[1;33m=\u001b[0m\u001b[1;32mFalse\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    101\u001b[0m         \u001b[1;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstatus\u001b[0m \u001b[1;33m==\u001b[0m \u001b[1;34m'error'\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 102\u001b[1;33m             \u001b[0msix\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mreraise\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0mresult\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    103\u001b[0m         \u001b[1;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstatus\u001b[0m \u001b[1;33m==\u001b[0m \u001b[1;34m'cancelled'\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    104\u001b[0m             \u001b[1;32mraise\u001b[0m \u001b[0mresult\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32m/Users/benjaminrk/conda/lib/python3.5/site-packages/six.py\u001b[0m in \u001b[0;36mreraise\u001b[1;34m(tp, value, tb)\u001b[0m\n\u001b[0;32m    683\u001b[0m             \u001b[0mvalue\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mtp\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    684\u001b[0m         \u001b[1;32mif\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m__traceback__\u001b[0m \u001b[1;32mis\u001b[0m \u001b[1;32mnot\u001b[0m \u001b[0mtb\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 685\u001b[1;33m             \u001b[1;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mwith_traceback\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mtb\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    686\u001b[0m         \u001b[1;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    687\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32m<ipython-input-13-183a85878b6a>\u001b[0m in \u001b[0;36minverse\u001b[1;34m()\u001b[0m\n\u001b[0;32m      6\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m      7\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0minverse\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mx\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m----> 8\u001b[1;33m     \u001b[1;32mreturn\u001b[0m \u001b[1;36m1\u001b[0m \u001b[1;33m/\u001b[0m \u001b[0mx\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m      9\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m     10\u001b[0m \u001b[0mshifted\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mexecutor\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmap\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mshift5\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mrange\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m10\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;31mZeroDivisionError\u001b[0m: division by zero"
     ]
    }
   ],
   "source": [
    "from IPython.display import display\n",
    "from distributed import progress\n",
    "\n",
    "def shift5(x):\n",
    "    return x - 5\n",
    "\n",
    "def inverse(x):\n",
    "    return 1 / x\n",
    "\n",
    "shifted = executor.map(shift5, range(1, 10))\n",
    "inverted = executor.map(inverse, shifted)\n",
    "                       \n",
    "total = executor.submit(sum, inverted)\n",
    "display(progress(total))\n",
    "total.result()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can see which task failed:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[<Future: status: error, key: inverse-f8907aa30adc310cc8168553500ca8bb>]"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "[ f for f in inverted if f.status == 'error' ]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When IPython starts a worker on each engine,\n",
    "it stores it in the `distributed_worker` variable in the engine's namespace.\n",
    "This lets us query the worker interactively.\n",
    "\n",
    "We can check out the current data resident on each worker:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[0;31mOut[7:2]: \u001b[0m\n",
       "{'inverse-07072811957c38188d819607f8020bed': 0.3333333333333333,\n",
       " 'inverse-0994af96c984b7254e2437daa46df6c8': 1.0,\n",
       " 'inverse-1934b1ad8662540a6b1a321502d3d81e': 0.25,\n",
       " 'inverse-2e0af360f3e400c0360eaa3351e80a4d': -1.0,\n",
       " 'inverse-8ef20ef722160668e84ab435b8293751': -0.5,\n",
       " 'inverse-bee9906329afc3cb86cc241209453f56': -0.3333333333333333,\n",
       " 'inverse-cfd3e5b72a33fd2fa85c683107287cf9': -0.25,\n",
       " 'inverse-d9ed866e67ebc068f6561f9263c4cf73': 0.5,\n",
       " 'shift5-17c829bc866d38df11bb25ffc7ea887f': -3,\n",
       " 'shift5-3035396f215ce921eda38f8f36ca3e90': 4,\n",
       " 'shift5-4951afd99368d41997f42a2f823f566f': 2,\n",
       " 'shift5-5c9f9254c4a34e7571d53ee4839ea6f2': 1,\n",
       " 'shift5-8458d8715078405cb9dfed60d1c3d26a': -2,\n",
       " 'shift5-899e24c059f86698e06254cfd5f3f4ea': -1,\n",
       " 'shift5-9326e9993993cb1c08355c6e5b8e5970': -4,\n",
       " 'shift5-cabacfd5aaf525d183d932617b8eac5a': 0,\n",
       " 'shift5-e233bf13876414d6a0a4817695ac7ca1': 3}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "%%px\n",
    "dask_worker.data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "Now that we can poke around with each Worker,\n",
    "we can have a slightly easier time figuring out what went wrong."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.1"
  },
  "widgets": {
   "state": {
    "0db2b058b609455fa55654e5e3565453": {
     "views": [
      {
       "cell_index": 24
      }
     ]
    },
    "58612800fe1d42058d0cbc83f0534655": {
     "views": [
      {
       "cell_index": 18
      }
     ]
    },
    "f4647eb4300e4eb6bcdae457df082cc7": {
     "views": [
      {
       "cell_index": 5
      }
     ]
    }
   },
   "version": "1.2.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
